可以通过consumer在subscribe的时候引入一个ConsumerRebalanceListener来实现exactly once
consumer = new KafkaConsumer(getKafkaConsumerConfig());
consumer.subscribe(Arrays.asList(this.topic.split(",")),
new OffsetTrackingRebalanceListener(consumer, offsetManager));
看下OffsetTrackingRebalanceListener,这个就是ConsumerRebalanceListener的实现
public class OffsetTrackingRebalanceListener implements ConsumerRebalanceListener {
private OffsetManager offsetManager;
private Consumer<String, String> consumer;
public OffsetTrackingRebalanceListener(Consumer<String, String> consumer,OffsetManager offsetManager) {
this.consumer = consumer;
this.offsetManager = offsetManager;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),
consumer.position(partition));
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition,
offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
}
在里边通过一个外部的存储管理offsets
public class OffsetManager {
private String storagePrefix;
private RedisClient redisClient;
public OffsetManager(RedisClient redisClient, String storagePrefix) {
this.storagePrefix = storagePrefix;
this.redisClient = redisClient;
}
public void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
redisClient.set(storageName(topic, partition), String.valueOf(offset));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@SuppressWarnings({ "resource" })
public long readOffsetFromExternalStore(String topic, int partition) {
long ret = 0;
try {
String offset = redisClient.get(storageName(topic, partition));
ret = StringUtils.isEmpty(offset)? 0: Long.parseLong(offset);
} catch (Exception e) {
e.printStackTrace();
}
return ret;
}
private String storageName(String topic, int partition) {
return "../position/"+storagePrefix + "_" + topic + "_" + partition;
}
}
link:
Exactly-once Kafka Dynamic Consumer via Subscribe
Exactly-once using idempotent writes&using transactional writes